package com.renrendoc.network.flow.ana;

import com.renrendoc.network.flow.beans.out.PageEvent;
import com.renrendoc.network.flow.beans.out.TopNPage;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * 聚合输出操作
 */
public class TopPageProcess extends ProcessWindowFunction<TopNPage, TopNPage, Long, TimeWindow> {

    @Override
    public void process(Long aLong, ProcessWindowFunction<TopNPage, TopNPage, Long, TimeWindow>.Context context, Iterable<TopNPage> elements, Collector<TopNPage> out) throws Exception {
        TopNPage next = elements.iterator().next();

        out.collect(new TopNPage(next.aid, next.total, context.window().getEnd()));
    }
}
